BIO到NIO源码的一些事儿之NIO 上

BIO到NIO源码的一些事儿之NIO 上

前言

此篇文章会详细解读NIO的功能逐步丰满的路程,为Reactor-Netty 库的讲解铺平道路。

关于Java编程方法论-Reactor与Webflux的视频分享,已经完成了Rxjava 与 Reactor,b站地址如下:

Rxjava源码解读与分享:https://www.bilibili.com/video/av34537840

Reactor源码解读与分享:https://www.bilibili.com/video/av35326911

场景代入

接上一篇 BIO到NIO源码的一些事儿之BIO,我们来接触NIO的一些事儿。

在上一篇中,我们可以看到,我们要做到异步非阻塞,我们自己进行的是创建线程池同时对部分代码做timeout的修改来对接客户端,但是弊端也很清晰,我们转换下思维,这里举个场景例子,A班同学要和B班同学一起一对一完成任务,每对人拿到的任务是不一样的,消耗的时间有长有短,任务因为有奖励所以同学们会抢,传统模式下,A班同学和B班同学不经管理话,即便只是一个心跳检测的任务都得一起,在这种情况下,客户端根本不会有数据要发送,只是想告诉服务器自己还活着,这种情况下,假如B班再来一个同学做对接的话,就很有问题了,B班的每一个同学都可以看成服务器端的一个线程。所以,我们需要一个管理者,于是Selector就出现了,作为管理者,这里,我们往往需要管理同学们的状态,是否在等待任务,是否在接收信息,是否在输出信息等等,Selector更侧重于动作,针对于这些状态标签来做事情就可以了,那这些状态标签其实也是需要管理的,于是SelectionKey也就应运而生。接着我们需要对这些同学进行包装增强,使之携带这样的标签。同样,对于同学我们应该进一步解放双手的,比如给其配台电脑,这样,同学是不是可以做更多的事情了,那这个电脑在此处就是Buffer的存在了。
于是在NIO中最主要是有三种角色的,Buffer缓冲区,Channel通道,Selector选择器,我们都涉及到了,接下来,我们对其源码一步步分析解读。

Channel解读

赋予Channel可异步可中断的能力

有上可知,同学其实都是代表着一个个的Socket的存在,那么这里Channel就是对其进行的增强包装,也就是Channel的具体实现里应该有Socket这个字段才行,然后具体实现类里面也是紧紧围绕着Socket具备的功能来做文章的。那么,我们首先来看java.nio.channels.Channel接口的设定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface Channel extends Closeable {

/**
* Tells whether or not this channel is open.
*
* @return {@code true} if, and only if, this channel is open
*/
public boolean isOpen();

/**
* Closes this channel.
*
* <p> After a channel is closed, any further attempt to invoke I/O
* operations upon it will cause a {@link ClosedChannelException} to be
* thrown.
*
* <p> If this channel is already closed then invoking this method has no
* effect.
*
* <p> This method may be invoked at any time. If some other thread has
* already invoked it, however, then another invocation will block until
* the first invocation is complete, after which it will return without
* effect. </p>
*
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;

}

此处就是很直接的设定,判断Channel是否是open状态,关闭Channel的动作,我们在接下来会讲到ClosedChannelException是如何具体在代码中发生的。
有时候,一个Channel可能会被异步关闭和中断,这也是我们所需求的。那么要实现这个效果我们须得设定一个可以进行此操作效果的接口。达到的具体的效果应该是如果线程在实现这个接口的的Channel中进行IO操作的时候,另一个线程可以调用该Channel的close方法。导致的结果就是,进行IO操作的那个阻塞线程会收到一个AsynchronousCloseException异常。

同样,我们应该考虑到另一种情况,如果线程在实现这个接口的的Channel中进行IO操作的时候,另一个线程可能会调用被阻塞线程的interrupt方法(Thread#interrupt()),从而导致Channel关闭,那么这个阻塞的线程应该要收到ClosedByInterruptException异常,同时将中断状态设定到该阻塞线程之上。

这时候,如果中断状态已经在该线程设定完毕,此时在其之上的有Channel又调用了IO阻塞操作,那么,这个Channel会被关闭,同时,该线程会立即受到一个ClosedByInterruptException异常,它的interrupt状态仍然保持不变。
这个接口定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public interface InterruptibleChannel
extends Channel
{

/**
* Closes this channel.
*
* <p> Any thread currently blocked in an I/O operation upon this channel
* will receive an {@link AsynchronousCloseException}.
*
* <p> This method otherwise behaves exactly as specified by the {@link
* Channel#close Channel} interface. </p>
*
* @throws IOException If an I/O error occurs
*/
public void close() throws IOException;

}

其针对上面所提到逻辑的具体实现是在java.nio.channels.spi.AbstractInterruptibleChannel进行的,关于这个类的解析,我们来参考这篇文章InterruptibleChannel 与可中断 IO

赋予Channel可被多路复用的能力

我们在前面有说到,Channel可以被Selector进行使用,而Selector是根据Channel的状态来分配任务的,那么Channel应该提供一个注册到Selector上的方法,来和Selector进行绑定。也就是说Channel的实例要调用register(Selector,int,Object)。注意,因为Selector是要根据状态值进行管理的,所以此方法会返回一个SelectionKey对象来表示这个channelselector上的状态。关于SelectionKey,它是包含很多东西的,这里暂不提。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}
//java.nio.channels.spi.AbstractSelectableChannel#addKey
private void addKey(SelectionKey k) {
assert Thread.holdsLock(keyLock);
int i = 0;
if ((keys != null) && (keyCount < keys.length)) {
// Find empty element of key array
for (i = 0; i < keys.length; i++)
if (keys[i] == null)
break;
} else if (keys == null) {
keys = new SelectionKey[2];
} else {
// Grow key array
int n = keys.length * 2;
SelectionKey[] ks = new SelectionKey[n];
for (i = 0; i < keys.length; i++)
ks[i] = keys[i];
keys = ks;
i = keyCount;
}
keys[i] = k;
keyCount++;
}

一旦注册到Selector上,Channel将一直保持注册直到其被解除注册。在解除注册的时候会解除Selector分配给Channel的所有资源。
也就是Channel并没有直接提供解除注册的方法,那我们换一个思路,我们将Selector上代表其注册的Key取消不就可以了。这里可以通过调用SelectionKey#cancel()方法来显式的取消key。然后在Selector下一次选择操作期间进行对Channel的取消注册。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
//java.nio.channels.spi.AbstractSelectionKey#cancel
/**
* Cancels this key.
*
* <p> If this key has not yet been cancelled then it is added to its
* selector's cancelled-key set while synchronized on that set. </p>
*/
public final void cancel() {
// Synchronizing "this" to prevent this key from getting canceled
// multiple times by different threads, which might cause race
// condition between selector's select() and channel's close().
synchronized (this) {
if (valid) {
valid = false;
//还是调用Selector的cancel方法
((AbstractSelector)selector()).cancel(this);
}
}
}


//java.nio.channels.spi.AbstractSelector#cancel
void cancel(SelectionKey k) {
synchronized (cancelledKeys) {
cancelledKeys.add(k);
}
}


//在下一次select操作的时候来解除那些要求cancel的key,即解除Channel注册
//sun.nio.ch.SelectorImpl#select(long)
@Override
public final int select(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
//重点关注此方法
return lockAndDoSelect(null, (timeout == 0) ? -1 : timeout);
}
//sun.nio.ch.SelectorImpl#lockAndDoSelect
private int lockAndDoSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
synchronized (this) {
ensureOpen();
if (inSelect)
throw new IllegalStateException("select in progress");
inSelect = true;
try {
synchronized (publicSelectedKeys) {
//重点关注此方法
return doSelect(action, timeout);
}
} finally {
inSelect = false;
}
}
}
//sun.nio.ch.WindowsSelectorImpl#doSelect
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
this.timeout = timeout; // set selector timeout
processUpdateQueue();
//重点关注此方法
processDeregisterQueue();
if (interruptTriggered) {
resetWakeupSocket();
return 0;
}
...
}

/**
* sun.nio.ch.SelectorImpl#processDeregisterQueue
* Invoked by selection operations to process the cancelled-key set
*/
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);

Set<SelectionKey> cks = cancelledKeys();
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();

// remove the key from the selector
implDereg(ski);

selectedKeys.remove(ski);
keys.remove(ski);

// remove from channel's key set
deregister(ski);

SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();
}
}
}
}

这里,当Channel关闭时,无论是通过调用Channel#close还是通过打断线程的方式来对Channel进行关闭,其都会隐式的取消关于这个Channel的所有的keys,其内部也是调用了k.cancel()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
//java.nio.channels.spi.AbstractInterruptibleChannel#close
/**
* Closes this channel.
*
* <p> If the channel has already been closed then this method returns
* immediately. Otherwise it marks the channel as closed and then invokes
* the {@link #implCloseChannel implCloseChannel} method in order to
* complete the close operation. </p>
*
* @throws IOException
* If an I/O error occurs
*/
public final void close() throws IOException {
synchronized (closeLock) {
if (closed)
return;
closed = true;
implCloseChannel();
}
}
//java.nio.channels.spi.AbstractSelectableChannel#implCloseChannel
protected final void implCloseChannel() throws IOException {
implCloseSelectableChannel();

// clone keys to avoid calling cancel when holding keyLock
SelectionKey[] copyOfKeys = null;
synchronized (keyLock) {
if (keys != null) {
copyOfKeys = keys.clone();
}
}

if (copyOfKeys != null) {
for (SelectionKey k : copyOfKeys) {
if (k != null) {
k.cancel(); // invalidate and adds key to cancelledKey set
}
}
}
}

如果Selector自身关闭掉,那么Channel也会被解除注册,同时代表Channel注册的key也将变得无效:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//java.nio.channels.spi.AbstractSelector#close
public final void close() throws IOException {
boolean open = selectorOpen.getAndSet(false);
if (!open)
return;
implCloseSelector();
}
//sun.nio.ch.SelectorImpl#implCloseSelector
@Override
public final void implCloseSelector() throws IOException {
wakeup();
synchronized (this) {
implClose();
synchronized (publicSelectedKeys) {
// Deregister channels
Iterator<SelectionKey> i = keys.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
deregister(ski);
SelectableChannel selch = ski.channel();
if (!selch.isOpen() && !selch.isRegistered())
((SelChImpl)selch).kill();
selectedKeys.remove(ski);
i.remove();
}
assert selectedKeys.isEmpty() && keys.isEmpty();
}
}
}

一个channel最多可以最多只能在特定的selector注册一次。我们可以通过调用java.nio.channels.SelectableChannel#isRegistered的方法来确定是否向一个或多个Selector注册了channel。

1
2
3
4
5
6
7
8
9
//java.nio.channels.spi.AbstractSelectableChannel#isRegistered
// -- Registration --

public final boolean isRegistered() {
synchronized (keyLock) {
//我们在之前往Selector上注册的时候调用了addKey方法,即每次往//一个Selector注册一次,keyCount就要自增一次。
return keyCount != 0;
}
}

至此,继承了SelectableChannel这个类之后,这个channel就可以安全的由多个并发线程来使用。
这里,要注意的是,继承了AbstractSelectableChannel这个类之后,新创建的channel始终处于阻塞模式。然而与Selector的多路复用有关的操作必须基于非阻塞模式,所以在注册到Selector之前,必须将channel置于非阻塞模式,并且在取消注册之前,channel可能不会返回到阻塞模式。
这里,我们涉及了Channel的阻塞模式与非阻塞模式。在阻塞模式下,在Channel上调用的每个I/O操作都将阻塞,直到完成为止。 在非阻塞模式下,I/O操作永远不会阻塞,并且可以传输比请求的字节更少的字节,或者根本不传输任何字节。 我们可以通过调用channel的isBlocking方法来确定其是否为阻塞模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//java.nio.channels.spi.AbstractSelectableChannel#register
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)
throw new IllegalArgumentException();
if (!isOpen())
throw new ClosedChannelException();
synchronized (regLock) {
//此处会做判断,假如是阻塞模式,则会返回true,然后就会抛出异常
if (isBlocking())
throw new IllegalBlockingModeException();
synchronized (keyLock) {
// re-check if channel has been closed
if (!isOpen())
throw new ClosedChannelException();
SelectionKey k = findKey(sel);
if (k != null) {
k.attach(att);
k.interestOps(ops);
} else {
// New registration
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
return k;
}
}
}

所以,我们在使用的时候可以基于以下的例子作为参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public NIOServerSelectorThread(int port)
{
try {
//打开ServerSocketChannel,用于监听客户端的连接,他是所有客户端连接的父管道
serverSocketChannel = ServerSocketChannel.open();
//将管道设置为非阻塞模式
serverSocketChannel.configureBlocking(false);
//利用ServerSocketChannel创建一个服务端Socket对象,即ServerSocket
serverSocket = serverSocketChannel.socket();
//为服务端Socket绑定监听端口
serverSocket.bind(new InetSocketAddress(port));
//创建多路复用器
selector = Selector.open();
//将ServerSocketChannel注册到Selector多路复用器上,并且监听ACCEPT事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("The server is start in port: "+port);
} catch (IOException e) {
e.printStackTrace();
}
}

因时间关系,本篇暂时到这里,剩下的会在下一篇中进行讲解。

您的支持将鼓励我继续创作!